feat: add lifecycle-bound SSE stream and typed per-endpoint adapter#153
Merged
Conversation
Introduce SseStream: an AutoCloseable Iterable<ServerSentEvent> that owns
the underlying HTTP response. Closing the stream — explicitly, via use {} /
try-with-resources, or implicitly when iteration runs to completion — closes
the response and releases its connection, so a partial consume never strands
the body. This mirrors the close-on-partial-consume invariant PagedIterable
enforces. The previously doc-only "do not iterate twice" warning is now an
enforced single-pass guard, and iteration after close is rejected.
Add a reusable per-endpoint adapter, TypedSseStream<T>, that maps raw events
to typed models via a caller-supplied SseEventMapper. The mapper receives the
event name and joined data and returns a decoded value, Skip, or a Done
sentinel; it is the seam where the Serde SPI is invoked and where per-API
done-sentinel and error-envelope conventions live. Mapping is applied lazily,
one element at a time, so a partial consume decodes only the events taken.
Closing the typed adapter propagates to the underlying stream.
Both surfaces are hand-written runtime primitives usable today; a code
generator can target them later without embedding any per-API convention in
core.
Closes #35
Closes #62
SseStream and TypedSseStream only released the underlying response when
iteration reached a clean end-of-stream. A mid-stream reader failure (a
dropped connection) or a throwing event mapper propagated the error without
closing, so any consumer that iterated without `use {}` — a bare for-loop or
`toList()` — stranded the response body and its pooled connection. The
success path auto-closed, which made bare iteration look safe right up until
an error hit.
Release the response on those error paths too. Close is idempotent, so a
surrounding `use {}` still works; a failure to release is attached to the
original error as a suppressed throwable instead of masking it.
Make automatic end-of-stream cleanup tolerant of a close failure as well: the
events were already delivered, so a failing resource close on the final pull
(clean EOS, or a TypedSseStream done-sentinel) must not turn a fully-read
stream into a thrown result and discard the collected events. An explicit
close() still propagates a release failure — that is the caller asking to
release, so they own it.
Drop the redundant ReentrantLock around the close: the AtomicBoolean
compare-and-set already guarantees the resource is closed exactly once, so
only the CAS winner ever entered the lock. Clarify the threading docs —
cancelling a stream blocked inside an in-flight read surfaces as an
IOException to the iterating thread, not a clean end.
Automatic end-of-stream cleanup swallows a failure to release the response, because the events have already been delivered and letting the failure propagate would discard a fully-read stream. Swallowing it silently hides a real I/O problem, so emit it through ClientLogger at WARN (event `sse.close.failed`) with the cause attached. An explicit close() still propagates the failure, and the suppressed-onto-the-primary path for an in-flight reader/mapper error is unchanged. Centralize the quiet release in SseStream.releaseQuietly so the TypedSseStream done-sentinel and mapper-error paths share that one site instead of duplicating the close handling. The regenerated API snapshot reflects the logger now threaded through SseStream's private constructor; the public factory surface is unchanged.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a lifecycle-bound Server-Sent Events stream and a reusable typed adapter, so streaming endpoints can be consumed safely and decoded into models without per-API conventions leaking into core.
SseStream— AutoCloseable Iterable bound to the response (#35)The SSE surface was a bare
Sequencewhose reader explicitly disclaimed ownership of the response, so a partial consume could strand the connection.SseStreamnow wraps the WHATWG parser and owns the response:AutoCloseable, Iterable<ServerSentEvent>. Closing it — explicitly, viause {}/ try-with-resources, or implicitly when iteration runs to completion — closes the underlying response/body and releases its pooled connection. This mirrors the close-on-partial-consume invariantPagedIterableenforces.close()is rejected;close()is idempotent and safe to call concurrently (e.g. cancellation from another thread).use {}never strands the connection; any failure to release is attached to the reader error as a suppressed throwable.Response.sseStream()extension opens a stream bound to the response body lifecycle.TypedSseStream<T>+SseEventMapper<T>— per-endpoint typed adapter (#62)A reusable runtime adapter turning
SseStreamintoAutoCloseable Iterable<T>by applying a caller-supplied(eventName, data) -> Result<T>mapper:Skip(keep-alives, bare cursors), or aDonesentinel that ends the stream and closes it. It is the seam where theSerdeSPI is invoked and where per-API done-sentinel / error-envelope conventions live — core holds none of them.Serdedeserialize inside it) runs only when the consumer pulls the next element, so a partial consume decodes only the events taken.SseStream.typed(mapper)extension for ergonomic wrapping.This is a hand-written runtime primitive — no code generation — fully usable today; a generator can target it later without embedding any per-API convention in core.
Tests
SseStreamTestandTypedSseStreamTestcover: full-iteration auto-close, explicit close,use {}close on partial consume, idempotent close, single-pass and is-closed guards, out-of-band close mid-iteration, reader-exception propagation,Response.sseStream()lifecycle binding and the no-body error, typed mapping via a recordingDeserializer, lazy per-element decode,Skip/Donehandling, mapper-exception propagation, and close propagation.Gated build (scoped,
--no-daemon)Result: BUILD SUCCESSFUL.
:sdk-core:apiDumpwas run and the regeneratedsdk-core/api/sdk-core.apiis committed.Closes #35
Closes #62